/*!

\page so_5_5__in_depth__mchains so-5.5 In Depth - Message Chains

\section so_5_5__in_depth__mchains__purpose Purpose

The main purpose of <i>message chain</i> (or just <i>mchain</i>) mechanism is
providing a way for interacting between SObjectizer- and non-SObjectizer-part
of an application. The interaction in opposite direction is very simple: usual
message passing via mboxes is used. For example:

\code
class worker : public so_5::agent_t {
    public:
        ...
        virtual void so_define_agent() override {
            // A mbox to be used to receive requests.
            auto mbox = so_environment().create_mbox("requests");
            // Subscription to messages from that mbox...
            so_subscribe(mbox).event(&worker::request_handler1)
                    .event(&worker::request_handler2)
                    .event(&worker::request_handler3)
                    ...;
        }
        ...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
launch_sobjectizer(...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
so_5::send<request1>(mbox, ...);
so_5::send<request2>(mbox, ...);
...
\endcode

But how to receive some messages from an agent back to non-SObjectizer-part of
the application?

Message chain is the answer.

Message chain looks almost like mbox for agents. An agent can send messages to
mchain exactly the same way as for mbox. So mchain can be passed to agent and
agent will use it as destination for reply messages. On the other side of a
mchain will be non-SObjectizer message handler. This handler will receive
messages from the mchain by using special API functions and handle them
appropriately.

\code
class worker : public so_5::agent_t {
    public:
        worker(context_t ctx, so_5::mbox_t reply_to)
            : so_5::agent_t{ctx}, m_reply_to{std::move(reply_to)}
            {...}
        ...
        virtual void so_define_agent() override {
            // A mbox to be used to receive requests.
            auto mbox = so_environment().create_mbox("requests");
            // Subscription to messages from that mbox...
            so_subscribe(mbox).event(&worker::request_handler1)
                    .event(&worker::request_handler2)
                    .event(&worker::request_handler3)
                    ...;
        }
    private:
        // A mbox for sending responses.
        const so_5::mbox_t m_reply_to;
        ...
        // Request handler.
        void request_handler1(const request1 & r) {
            ... // Some processing.
            // Sending response back as a message.
            so_5::send<reply1>(m_reply_to, ...);
        }
        ...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
// Chain for receiving responses.
so_5::mchain_t reply_ch = sobj.environment().create_mchain(
    // Parameters for the simplest chain without any limitations.
    so_5::make_unlimited_mchain_params());
launch_sobjectizer(
    // Passing mchain as a parameter.
    // Cast it to mbox to look exactly like ordinary mbox.
    reply_ch.as_mbox(), ...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
// And receive responses.
so_5::send<request1>(mbox, ...);
receive(reply_ch, so_5::infinite_wait, [](const reply1 &) {...});
so_5::send<request2>(mbox, ...);
receive(reply_ch, so_5::infinite_wait, [](const reply2 &) {...});
...
\endcode

\section so_5_5__in_depth__mchains__more More Details

\subsection so_5_5__in_depth__mchains__more__types Types of Mchains

There are two types of mchains:

1. Unlimited mchains. Those mchains have no limitation for number of messages
   to be stored inside mchain. This number is limited only by the amount of
   available memory and by common sense of a developer.
2. Limited mchains. Those mchains have strict limit for number of messages. It
   is impossible to just push yet another message into full size-limited mchain.

Type of mchain is specified at the creation time. Once created the type of
mchain cannot be changed.

Type of mchain is specified by content of `so_5::mchain_params_t` instance
passed to `so_5::environment_t::create_mchain` method. There are three helper
functions which return properly initialized `mchain_params_t` instances:

\code
// Create size-unlimited mchain.
so_5::mchain_t m1 = env.create_mchain(so_5::make_unlimited_mchain_params());

// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = env.create_mchain(
	so_5::make_limited_without_waiting_mchain_params(
		200,
		so_5::mchain_props::memory_usage_t::dynamic,
		so_5::mchain_props::overflow_reaction_t::drop_newest));

// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = env.create_mchain(
	so_5::make_limited_with_waiting_mchain_params(
		200,
		so_5::mchain_props::memory_usage_t::dynamic,
		so_5::mchain_props::overflow_reaction_t::drop_newest,
		std::chrono::milliseconds(500)));
\endcode

The type `so_5::mchain_t` is like the type `so_5::mbox_t` -- it is an alias for
smart intrusive pointer to `so_5::abstract_message_chain_t`. It means that
mchain created by `create_mchain` will be destroyed automatically after
destruction of the last mchain_t object pointed to it.

\subsubsection so_5_5__in_depth__mchains__more__types__size_limited More About Size-limited Mchain

Size-limited mchains have a serious difference from size-unlimited mchains: a
size-limited mchain can't contain more message than the max capacity of the
mchain. So there should be some reaction on attempt to add another message to
full mchain.

There could be size-limited mchains which will perform waiting for some time on
attempt of pushing new message to full mchain. If there is a free place in the
mchain after that waiting then new message will be stored into the mchain. An
appropriate overload reaction will be performed otherwise.

There also could be size-limited mchains without any waiting on full mchain. If
there is no free room in the mchain then an appropriate overload reaction will
be performed immediately.

There are four overload reactions which can be selected for an mchain at the
moment of mchain creation:

1. Dropping the new message. It means that new message will be simply ignored.
2. Removing of the oldest message from the mchain. It means that oldest message
   in the mchain will be removed and it will not be processed.
3. Throwing an exception. An exception `so_5::exception_t` with error code
   `so_5::rc_msg_chain_overflow` will be raised as result of attempt of pushing
   new message to full mchain.
4. Aborting the whole application by calling `std::abort()`.

All those variants are described as items of enumeration
`so_5::mchain_props::overflow_reaction_t`.

There is yet another important property which must be specified for
size-limited mchain at the creation time: the type of memory usage.

Memory for storing messages inside mchain can be used dynamically: it would be
allocated when mchain grows and deallocated when mchain shrinks.

Or memory for mchain could be preallocated and there will be fixed-size buffer
for messages. Size of that buffer will not change during growth and shrinking
of the mchain.

Types of memory usage are described as items of enumeration
`so_5::mchain_props::memory_usage_t`.

All those traits of size-limited mchains are controlled by `mchain_params_t`
object passed to `create_mchain` method. The simplest and recommended way of
preparing the corresponding `mchain_params_t` is usage of
`make_limited_without_waiting_mchain_params` and
`make_limited_with_waiting_mchain_params` helper functions:

\code
so_5::environment_t & env = ...;
so_5::mchain_t chain1 = env.create_mchain(
    so_5::make_limited_without_waiting_mchain_params(
        // No more than 200 messages in the chain.
        200,
        // Memory will be allocated dynamically.
        so_5::mchain_props::memory_usage_t::dynamic,
        // New messages will be ignored on chain's overflow.
        so_5::mchain_props::overflow_reaction_t::drop_newest));
so_5::mchain_t chain2 = env.create_mchain(
    so_5::make_limited_with_waiting_mchain_params(
        // No more than 200 messages in the chain.
        200,
        // Memory will be preallocated.
        so_5::mchain_props::memory_usage_t::preallocated,
        // The oldest message will be removed on chain's overflow.
        so_5::mchain_props::overflow_reaction_t::remove_oldest,
        // But before dropping a new message there will be 500ms timeout
        std::chrono::milliseconds(500)));
\endcode


\subsection so_5_5__in_depth__mchains__more__receive How To Receive And Handle Messages From Mchain?

There are two variants of `so_5::receive` function which allows to receive and
handle messages from a mchain.

The first variant of `receive` takes a `mchain_t`, a timeout and a list of
message handlers:

\code
so_5::mchain_t ch = env.create_mchain(...);
...
receive(ch, std::chrono::milliseconds(500),
	[](const reply1 & r) {...},
	[](const reply2 & r) {...},
	...
	[](const replyN & r) {...});
\endcode

It checks the mchain and wait for no more than 500ms if the mchain is empty. If
the mchain is not empty it extracts just one message from the mchain and tries
to find an appropriate handler for it. If handler is found it is called. If a
handler is not found then the message extracted will be thrown out without any
processing.

If the mchain is empty even after the specified timeout then `receive` will do
nothing.

There are two special values which can be used as timeout:

- value `so_5::no_wait` specifies zero timeout. It means that `receive` will
  not wait if the mchain is empty;
- value `so_5::infinite_wait` specifies unlimited waiting. The return from
  `receive` will be on arrival of any message. Or if the mchain is closed
  explicitly.

There is also more advanced version of `so_5::receive` which can receive and
handle more than one message from mchain. It receives a
`so_5::mchain_receive_params_t` objects with list of conditions and returns
control if any of those conditions becomes true. For example:

\code
// Handle 3 messages.
// If there are no 3 messages in the mchan then wait for them.
// The receive returns control when 3 messages will be processed or when the mchain
// will be closed explicitly.
receive( from(chain).handle_n( 3 ),
      handlers... );

// Handle 3 messages but wait no more than 200ms on empty mchain.
// If mchain is empty for more than 200ms then receive returns control even
// if less than 3 messages were handled.
receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
      handlers... );

// Handle all messages which are going to the mchain. But handling will be finished
// if a pause between arrivals of messages exceeds 500ms.
receive( from(chain).empty_timeout( milliseconds(500) ),
      handlers... );

// Handle all messages for 2s.
// The return from receive will be after 2s or if the mchain is closed explicitly.
receive( from(chain).total_time( seconds(2) ),
      handlers... );

// The same as previous but the return can occurs after extraction of 1000 messages.
receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
      handlers... ); 
\endcode

There is a difference between a number of extracted messages and a number of
handled messages. A message is extracted from a mchain and count of extracted
messages increases. But the count of handled messages is increased only if a
handler for that message type if found and called. It means that the number of
extracted messages can be greater than number of handled messages.

Both `receive` return an object of `so_5::mchain_receive_result_t` type.
Methods of that object allow to get number of extracted and handled messages,
and also the status of `receive` operation.

The usage of advanced version of `receive` could look like:

\code
so_5::send< a_supervisor::ask_status >( req_mbox );
auto r = receive(
      from( chain ).empty_timeout( milliseconds(200) ),
      []( a_supervisor::status_idle ) {
         cout << "status: IDLE" << endl;
      },
      []( a_supervisor::status_in_progress ) {
         cout << "status: IN PROGRESS" << endl;
      },
      []( a_supervisor::status_finished v ) {
         cout << "status: finished in " << v.m_ms << "ms" << endl;
      } );
if( !r.handled() )
   cout << "--- no response from supervisor ---" << endl; 
\endcode

\subsubsection so_5_5__in_depth__mchains__more__receive__handlers Message Handlers

Message handlers which are passed to `receive` functions family must be
lambda-functions or functional objects with format:

\code
return_type handler(const message_type&);
return_type handler(message_type);
\endcode

For example:

\code
struct classical_message : public so_5::message_t { ... };
struct user_message { ... };
...
receive(chain, so_5::infinite_wait,
	[](const classical_message & m) {...},
	[](const user_message & m) {...},
	[](const int & m) {...},
	[](long m) {...});
\endcode

A handler for signal of type `signal_type` can be created by the help of
`so_5::handler<signal_type>()` function:

\code
struct stopped : public so_5::signal_t {};
struct waiting : public so_5::signal_t {};
...
receive(chain, so_5::infinite_wait,
	[](const classical_message & m) {...},
	[](const user_message & m) {...},
	so_5::handler<stopped>( []{...} ),
	so_5::handler<waiting>( []{...} ));
\endcode

<b>NOTE!</b> <i>All message handlers must handle different message types. It is an error if some handlers are defined for the same message type.</i>

\subsection so_5_5__in_depth__mchains__more__send_funcs Usage Of Mchains With Send Functions

All traditional send-functions like `so_5::send`, `so_5::send_delayed` and
`so_5::send_periodic` work with mchains the same way they work with mboxes. It
allows to write code the traditional way:

\code
so_5::mchain_t ch = env.create_mchain(...);
...
// Sending a message.
so_5::send<my_message>(ch, ... ); // some args for my_message's constructor.

// Sending a delayed message.
so_5::send_delayed<my_message>(ch, std::chrono::milliseconds(200),
	... ); // some args for my_message's constructor

// Sending a periodic message.
auto timer_id = so_5::send_periodic<my_message>(ch,
	std::chrono::milliseconds(200),
	std::chrono::milliseconds(250),
	... ); // some args for my_message's constructor
\endcode

The functions for performing service request, `so_5::request_value` and
`so_5::request_future`, work with mchain too. It means that an agent can make a
service request to non-SObjectizer part of an application and receive the
result of that request as usual:

\code
// Somewhere in SObjectizer part of an application...
class worker : public so_5::agent_t {
...
	const so_5::mchain_t m_request_ch;

	void some_event_handler() {
		auto f = so_5::request_future<response, request>(m_request_ch,
			... ); // some args for request's constructor
		...
		handle_response(f.get());
	}
};
...
// Somewhere in non-SObjectizer part of an application...
const so_5::mchain_t request_ch = ...;
...
receive(from(request_ch).handle_n(1000),
	[](const request & req) -> response {...},
	...);
\endcode

\subsection so_5_5__in_depth__mchains__more__as_mbox Mchains as Mboxes

There is a method `so_5::abstract_message_chain_t::as_mbox()` which can
represent mchain as almost ordinary mbox. This method returns `so_5::mbox_t`
and this mbox can be used for sending messages and performing service request
to the mchain.

Method `as_mbox()` can be useful if there is a necessity to hide the fact of
mchain existence. For example, an agent inside SObjectizer part of an
application can receive mbox and think that there is another agent on the
opposite side:

\code
// SObjectizer part of an application.
class worker : so_5::agent_t {
public:
	worker(context_t ctx, so_5::mbox_t request_mbox)
		: so_5::agent_t{ctx}, m_request_mbox{std::move(request_mbox)}
		{}
	...
private:
	const so_5::mbox_t m_request_mbox;
	...
	void some_event_handler() {
		auto f = so_5::request_future<response, request>(m_request_mbox,
			... ); // some args for request's constructor
		...
		handle_response(f.get());
	}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
	coop.make_agent<worker>(
		// Passing mchain as mbox to worker agent.
		ch->as_mbox());
...
} );
...
receive(from(ch).handle_n(1000),
	[](const request & req) -> response {...},
	...);
\endcode

The only difference between ordinary mbox created by
`so_5::environment_t::create_mbox()` and mbox created by `as_mbox()` is
impossibility of subscriptions creation. It means that agent `worker` from the
example above can't subscribe its event handlers to messages from
`m_request_mbox`.

The only case when mchain can be passed to SObjectizer part directly as mchain
(without casting it to mbox by `as_mbox()`) is a necessity of explicit closing
of mchain in the SObjectizer part of an application:

\code
class data_producer : public so_5::agent_t {
public:
	data_producer(context_t ctx, so_5::mchain_t data_ch)
		: so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
	{}
	...
private:
	const so_5::mchain_t m_data_ch;
	...
	void read_data_event() {
		auto data = read_next_data();
		if(data_read())
			so_5::send<data_sample>(m_data_ch, data);
		else {
			// Data source is closed. No more data can be received.
			// Close the data chain and finish out work.
			m_data_ch->close();
			so_deregister_coop_normally();
		}
	}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
	coop.make_agent<data_reader>(
		// Passing mchain as mchain to data_reader agent.
		ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch), [](const data_sample & data) {...});
\endcode

\subsection so_5_5__in_depth__mchains__more__notificator Non-Empty Notificator For Mchain

There is a possibility to specify a function which will be called automatically
when a message in stored into the empty mchain:

\code
so_5::environment_t & env = ...;
so_5::mchain_t ch = env.create_mchain(
	so_5::make_unlimited_mchain_params().non_empty_notificator( []{ ... } ));
\endcode

This feature can be used in GUI applications for example. Some widget can
create mchain and needs to know when there are messages in that mchain. Then
widget can add notificator to the mchain and send some GUI-message/signal to
itself from that notificator. Some of `receive` functions will be called in
processing of that GUI-message/signal.

\subsection so_5_5__in_depth__mchains__more__mpsc Mchain Has Multi-Producer/Single-Consumer Queue

The mchain mechanism introduced in v.5.5.13 uses MPSC queue inside. It means
that `receive` on a mchain must be called from one thread.

There will be no any damage if several `receive` will be called for one mchain
from different threads at the same time (no messages will be lost or processed
several times). But there could be not efficient work of that threads
(like threads starvation).

*/

// vim:ft=cpp

